package com.ml4ai.flink.hadoopDFS

import java.io.{ByteArrayOutputStream, ObjectOutputStream}
import java.util.ArrayList

import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.configuration.Configuration
import org.apache.flink.shaded.curator.org.apache.curator.utils.CloseableUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{BytesWritable, NullWritable, SequenceFile}
import org.apache.hadoop.io.SequenceFile._

import collection.JavaConverters._
import scala.reflect.ClassTag

class HadoopValueBatchSerializeOutputFormat[A](val path: String, val batchSize: Int) extends OutputFormat[A] {

  def this(path: String) {
    this(path, 100)
  }

  @transient
  var buffer: ArrayList[A] = null

  @transient
  var writer: Writer = null

  var offset: Int = -1

  override def configure(parameters: Configuration): Unit = {

  }

  override def close(): Unit = {
    flush
    CloseableUtils closeQuietly writer
  }

  override def open(taskNumber: Int, numTasks: Int): Unit = {
    init
    val conf = new HadoopConfiguration()
    val fs = FileSystem.get(conf)
    fs.delete(new Path(path), true)
    fs.close
    writer = SequenceFile.createWriter(conf, Writer.file(new Path(s"${path}/${taskNumber}".replaceAll("/+", "/"))), Writer.keyClass(classOf[NullWritable]), Writer.valueClass(classOf[BytesWritable]))
  }

  override def writeRecord(record: A): Unit = {
    offset += 1
    buffer.add(record)
    if (offset >= batchSize) {
      flush
    }
  }

  def init(): Unit = {
    buffer = new ArrayList[A]
    offset = 0
  }

  def flush(): Unit = {
    if (buffer.size > 0) {
      val bufferArray: Array[A] = buffer.iterator.asScala.toArray(ClassTag.AnyRef.asInstanceOf[ClassTag[A]])
      val bs: Array[Byte] = toData(bufferArray)
      writer.append(NullWritable.get(), new BytesWritable(bs))
      init
    }
  }

  def toData(`var`: AnyRef): Array[Byte] = {
    val byteArrayOutputStream = new ByteArrayOutputStream
    val objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)
    objectOutputStream.writeObject(`var`)
    objectOutputStream.flush
    val bytes = byteArrayOutputStream.toByteArray
    CloseableUtils.closeQuietly(objectOutputStream)
    CloseableUtils.closeQuietly(byteArrayOutputStream)
    bytes
  }
}
